package com.iteaj.iot.client;

import com.iteaj.iot.FrameworkManager;
import com.iteaj.iot.IotThreadManager;
import com.iteaj.iot.ProtocolException;
import com.iteaj.iot.client.codec.ClientProtocolEncoder;
import com.iteaj.iot.client.component.SocketClientComponent;
import com.iteaj.iot.client.handle.ClientServiceHandler;
import com.iteaj.iot.client.protocol.ClientSocketProtocol;
import com.iteaj.iot.codec.adapter.SocketMessageDecoderDelegation;
import com.iteaj.iot.event.ClientStatus;
import com.iteaj.iot.event.StatusEvent;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

import static com.iteaj.iot.CoreConst.*;

/**
 * client socket
 * @see TcpSocketClient
 * @see UdpSocketClient
 */
public abstract class SocketClient implements IotClient {

    private Channel channel;

    private Bootstrap bootstrap;

    /**
     * 重连时间
     */
    private long reconnectTime;

    /**
     * 重连调度任务
     */
    private ScheduledFuture<?> reconnectSchedule;

    private ClientConnectProperties config;

    private SocketClientComponent clientComponent;

    public Logger logger = LoggerFactory.getLogger(getClass());

    public SocketClient(SocketClientComponent clientComponent, ClientConnectProperties config) {
        this.config = config;
        this.clientComponent = clientComponent;
        this.reconnectTime = config.getReconnect();

        if(this.config == null) {
            throw new IllegalArgumentException("未指定连接配置[ConnectProperties]");
        }
    }

    @Override
    public void init(Object config) {
        this.bootstrap = new Bootstrap()
            .group(IotThreadManager.instance().getWorkerGroup())
            .channel(channel())
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel channel) throws Exception {
                    // 关掉原有的链接
                    if(SocketClient.this.channel != null
                            && SocketClient.this.channel.isActive()) {
                        SocketClient.this.channel.close();
                    }

                    SocketClient.this.channel = channel;
                    ChannelPipeline pipeline = channel.pipeline();

                    // 设置编解码器
                    ChannelInboundHandler decoder = createProtocolDecoder();
                    if(decoder instanceof SocketMessageDecoderDelegation) {
                        if(((SocketMessageDecoderDelegation<?>) decoder).getDelegation() == null) {
                            ((SocketMessageDecoderDelegation<?>) decoder).setDelegation(getClientComponent());
                        }
                    }

                    // 设置当前客户端连接的服务器配置信息
                    channel.attr(CLIENT_KEY).set(SocketClient.this.getConfig());
                    // 设置链接使用得组件
                    channel.attr(COMPONENT).set(getClientComponent());

                    // 设置iot客户端编解码器
                    pipeline.addFirst(CLIENT_DECODER_HANDLER, decoder);
                    pipeline.addFirst(CLIENT_ENCODER_HANDLER, createProtocolEncoder());

                    // 业务处理器新增到最后
                    pipeline.addLast(CLIENT_SERVICE_HANDLER, new ClientServiceHandler(getClientComponent()));

                    // 自定义处理器
                    SocketClient.this.doInitChannel(channel);

                }
        });

        this.doInitOptions(this.bootstrap);
    }

    @Override
    public ChannelFuture connect() {
        try {
            // 只有在未激活的情况下才进行连接
            if(!isConnect()) {
                return this.doConnect();
            } else {
                return getChannel().newSucceededFuture();
            }
        } catch (Exception e) {
            return getChannel().newFailedFuture(new ProtocolException(e.getMessage(), e));
        }
    }

    /**
     * 连接完成处理
     * @param future
     */
    protected void finishedConnect(Future future) {
        if(future.isSuccess()) {
            if(this.reconnectSchedule != null && !this.reconnectSchedule.isDone()) {
                this.reconnectSchedule.cancel(true);
                this.reconnectTime = getConfig().getReconnect(); // 恢复重连时间
            }

            this.reconnectSchedule = null;
            // 连接成功必须保存, 否则会出现获取不到客户端的情况
            // 新增客户端会判断此客户端是否已经存在的情况
            clientComponent.addClient(getConfig(), this);
            // 成功的回调
            successCallback((ChannelFuture) future);
            if (logger.isInfoEnabled()) {
                logger.info("客户端({}) 连接服务器成功 - 远程主机 {}:{} - 客户端标识：{}", getClientComponent().getName()
                        , this.getHost(), this.getPort(), getConfig().connectKey());
            }
        } else {
            this.reconnectSchedule = null;
            logger.error("客户端({}) 连接服务器失败 - 远程主机 {}:{} - 客户端标识：{}", getClientComponent().getName()
                    , this.getHost(), this.getPort(), getConfig().connectKey(), future.cause());

            // 此处重连需要符合其中某个条件 1. 客户端是默认客户端 2. 此客户端曾经连接成功过
            if(clientComponent.getClient() == this || clientComponent.getClient(getConfig()) != null) {
                this.reconnection();
            }
        }
    }

    /**
     * 真正连接服务器的方法
     * @see MultiClientManager#addClient(Object, IotClient) <br>
     *     注意：连接成功后需要加入到客户端管理器
     */
    protected ChannelFuture doConnect() {
        return this.bootstrap.connect(getConfig().remoteSocketAddress(), getConfig().localSocketAddress())
                .addListener(future -> finishedConnect(future));
    }

    @Override
    public ChannelFuture disconnect() {
        if(isConnect()) {
            this.channel.attr(CLIENT_CLOSED_NORMAL).set(Boolean.TRUE);
            this.channel.attr(CLIENT_CLOSED_TYPE).set(Integer.valueOf(2));
            return this.channel.disconnect().addListener(future -> {
                if(future.isSuccess()) {
                    if(this.reconnectSchedule != null && !this.reconnectSchedule.isDone()) {
                        this.reconnectSchedule.cancel(true);
                    }
                } else {
                    // 断开失败
                    this.channel.attr(CLIENT_CLOSED_NORMAL).set(Boolean.FALSE);
                    logger.error("客户端断线({}) disconnect(失败) - 客户端标识：{}", getName(), getConfig(), future.cause());
                }
            });
        } else {
            return getChannel().newSucceededFuture();
        }
    }

    /**
     * 关闭连接 并且移除客户端
     */
    @Override
    public ChannelFuture close() {
        if(isConnect()) {
            this.channel.attr(CLIENT_CLOSED_NORMAL).set(Boolean.TRUE);
            this.channel.attr(CLIENT_CLOSED_TYPE).set(Integer.valueOf(1));
            return this.channel.close().addListener(future -> {
                if(future.isSuccess()) {
                    // 取消重连任务
                    if(this.reconnectSchedule != null && !this.reconnectSchedule.isDone()) {
                        this.reconnectSchedule.cancel(true);
                    }
                } else {
                    // 断开失败
                    this.channel.attr(CLIENT_CLOSED_NORMAL).set(null);
                    logger.error("客户端正常关闭({}) 关闭失败 - 客户端标识：{}", getName(), getConfig(), future.cause());
                }
            });
        } else {
            return getChannel().newSucceededFuture();
        }
    }

    /**
     * 断线成功后的处理
     * 默认的处理方式：remove=true 移除 remove=false 重连
     */
    protected void disconnectSuccessCall() {

    }

    protected void doInitOptions(Bootstrap bootstrap) {
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    }

    protected abstract Class<? extends Channel> channel();

    /**
     * 连接配置
     * @return
     */
    @Override
    public ClientConnectProperties getConfig() {
        return this.config;
    }

    /**
     * 是否是重连状态
     * @return
     */
    public boolean isReconnect() {
        return this.reconnectSchedule != null && !this.reconnectSchedule.isCancelled();
    }

    /**
     * 断线重连
     * @see ClientConnectProperties#getReconnect() 重连时间
     */
    public synchronized void reconnection() {
        if(!this.isConnect() && this.getClientComponent().isStart() && this.reconnectSchedule == null && this.reconnectTime > 0) {
            logger.warn("客户端({}) 断线重连 - 等待重连时间：{}(s) - 远程主机 {}:{} - 客户端标识：{}", getClientComponent().getName()
                    , reconnectTime, this.getHost(), this.getPort(), getConfig().connectKey());
            this.reconnectSchedule = IotThreadManager.instance().getDeviceManageEventExecutor().schedule(() -> {
                if(!this.isConnect()) { // 还没有连接成功
                    this.connect(); // 连接
                }
            }, this.reconnectTime, TimeUnit.SECONDS);
        }
    }

    /**
     * 写出协议报文
     * @see ClientProtocolEncoder 协议编码器, 此处是真正写出报文的地方
     * @param clientProtocol
     * @return
     */
    public ChannelFuture writeAndFlush(ClientSocketProtocol clientProtocol) {
        return this.writeAndFlush(clientProtocol, null);
    }

    /**
     * 是否已经连接
     * @return
     */
    public boolean isConnect() {
        return this.getChannel() != null && this.getChannel().isActive();
    }

    public ChannelFuture writeAndFlush(Object msg, Object... args) {
        if(this.getChannel().isWritable()){
            return this.getChannel().writeAndFlush(msg);
        } else {
            /**
             * 可以通过配置修改此值水位线
             * @see ChannelOption#WRITE_BUFFER_WATER_MARK
             * @see WriteBufferWaterMark(10000, 20000)
             * @see Bootstrap#option(ChannelOption, Object)
             */
            return this.getChannel().newFailedFuture(new UnWritableProtocolException(msg
                    , getChannel().bytesBeforeWritable(), getChannel().bytesBeforeUnwritable()));
        }
    }

    protected void doInitChannel(Channel channel) { }

    /**
     * 连接成功后的回调
     * @param future
     */
    protected void successCallback(ChannelFuture future) {
        // 发布客户端上线事件
        FrameworkManager.publishEvent(new StatusEvent(this, ClientStatus.online, getClientComponent()));
    }

    /**
     * 创建客户端socket解码器
     * @see io.netty.handler.codec.ByteToMessageDecoder 此对象的子类不能使用 {@link io.netty.channel.ChannelHandler.Sharable}
     * @return
     */
    protected abstract ChannelInboundHandler createProtocolDecoder();

    /**
     * 创建socket编码器
     * @return 默认使用iot框架实现的编码器 {@link ClientProtocolEncoder}
     */
    protected ChannelOutboundHandlerAdapter createProtocolEncoder() {
        return new ClientProtocolEncoder(getClientComponent());
    }

    @Override
    public SocketClientComponent getClientComponent() {
        return clientComponent;
    }

    protected void setClientComponent(SocketClientComponent clientComponent) {
        this.clientComponent = clientComponent;
    }

    public String getName() {
        return this.clientComponent.getName();
    }

    public Bootstrap getBootstrap() {
        return bootstrap;
    }

    public Channel getChannel() {
        return channel;
    }

    protected SocketClient setChannel(Channel channel) {
        this.channel = channel; return this;
    }
}
